package com.bihua.iot.service.impl;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.bihua.common.core.domain.PageQuery;
import com.bihua.common.core.domain.model.LoginUser;
import com.bihua.common.core.page.TableDataInfo;
import com.bihua.common.exception.ServiceException;
import com.bihua.common.helper.LoginHelper;
import com.bihua.common.utils.BeanCopyUtils;
import com.bihua.common.utils.StringUtils;
import com.bihua.common.utils.redis.RedisUtils;
import com.bihua.iot.constant.Constants;
import com.bihua.iot.domain.Device;
import com.bihua.iot.domain.DeviceTopic;
import com.bihua.iot.domain.TdFields;
import com.bihua.iot.domain.TdSelect;
import com.bihua.iot.domain.TdTable;
import com.bihua.iot.domain.bo.DeviceBo;
import com.bihua.iot.domain.bo.ProductBo;
import com.bihua.iot.domain.bo.ProductServicesBo;
import com.bihua.iot.domain.vo.DeviceVo;
import com.bihua.iot.domain.vo.ProductServicesVo;
import com.bihua.iot.domain.vo.ProductVo;
import com.bihua.iot.enums.DeviceConnectStatus;
import com.bihua.iot.enums.DeviceTopicEnum;
import com.bihua.iot.enums.DeviceType;
import com.bihua.iot.mapper.DeviceMapper;
import com.bihua.iot.service.IDeviceLocationService;
import com.bihua.iot.service.IDeviceService;
import com.bihua.iot.service.IDeviceTopicService;
import com.bihua.iot.service.IProductService;
import com.bihua.iot.service.IProductServicesService;
import com.bihua.iot.service.MqttService;
import com.bihua.iot.service.TdEngineService;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * 边设备档案信息Service业务层处理
 *
 * @author bihua
 * @date 2023-06-16
 */
@RequiredArgsConstructor
@Slf4j
@Service
public class DeviceServiceImpl implements IDeviceService {

    private final DeviceMapper baseMapper;
    private final IDeviceLocationService iDeviceLocationService;
    private final IDeviceTopicService iDeviceTopicService;
    private final IProductService iProductService;
    private final IProductServicesService iProductServicesService;
    private final TdEngineService tdEngineService;
    private final MqttService mqttService;

    @Value("${spring.datasource.dynamic.datasource.td.dbName:bihua}")
    private String dataBaseName;
    /**
     * 查询边设备档案信息
     */
    @Override
    public DeviceVo queryById(Long id){

        return baseMapper.selectVoById(id);
    }

    /**
     * 查询边设备档案信息列表
     */
    @Override
    public TableDataInfo<DeviceVo> queryPageList(DeviceBo bo, PageQuery pageQuery) {
        LambdaQueryWrapper<Device> lqw = buildQueryWrapper(bo);
        Page<DeviceVo> result = baseMapper.selectPageList(pageQuery.build(), lqw);
        return TableDataInfo.build(result);
    }

    /**
     * 查询边设备档案信息列表
     */
    @Override
    public List<DeviceVo> queryList(DeviceBo bo) {
        LambdaQueryWrapper<Device> lqw = buildQueryWrapper(bo);
        return baseMapper.selectVoList(lqw);
    }

    private LambdaQueryWrapper<Device> buildQueryWrapper(DeviceBo bo) {
        Map<String, Object> params = bo.getParams();
        LambdaQueryWrapper<Device> lqw = Wrappers.lambdaQuery();
        lqw.eq(StringUtils.isNotBlank(bo.getClientId()), Device::getClientId, bo.getClientId());
        lqw.like(StringUtils.isNotBlank(bo.getUserName()), Device::getUserName, bo.getUserName());
        lqw.eq(StringUtils.isNotBlank(bo.getPassword()), Device::getPassword, bo.getPassword());
        lqw.eq(StringUtils.isNotBlank(bo.getAuthMode()), Device::getAuthMode, bo.getAuthMode());
        lqw.eq(StringUtils.isNotBlank(bo.getDeviceIdentification()), Device::getDeviceIdentification, bo.getDeviceIdentification());
        lqw.like(StringUtils.isNotBlank(bo.getDeviceName()), Device::getDeviceName, bo.getDeviceName());
        lqw.eq(StringUtils.isNotBlank(bo.getConnector()), Device::getConnector, bo.getConnector());
        lqw.eq(StringUtils.isNotBlank(bo.getDeviceDescription()), Device::getDeviceDescription, bo.getDeviceDescription());
        lqw.eq(StringUtils.isNotBlank(bo.getDeviceStatus()), Device::getDeviceStatus, bo.getDeviceStatus());
        lqw.eq(StringUtils.isNotBlank(bo.getConnectStatus()), Device::getConnectStatus, bo.getConnectStatus());
        lqw.eq(StringUtils.isNotBlank(bo.getIsWill()), Device::getIsWill, bo.getIsWill());
        lqw.eq(StringUtils.isNotBlank(bo.getDeviceTags()), Device::getDeviceTags, bo.getDeviceTags());
        lqw.eq(StringUtils.isNotBlank(bo.getProductIdentification()), Device::getProductIdentification, bo.getProductIdentification());
        lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), Device::getProtocolType, bo.getProtocolType());
        lqw.eq(StringUtils.isNotBlank(bo.getDeviceType()), Device::getDeviceType, bo.getDeviceType());
        lqw.eq(StringUtils.isNotBlank(bo.getRemark()), Device::getRemark, bo.getRemark());
        return lqw;
    }

    /**
     * 新增边设备档案信息
     */
    @Override
    public Boolean insertByBo(DeviceBo bo) {
        Device add = BeanUtil.toBean(bo, Device.class);
        add.setConnectStatus(DeviceConnectStatus.INIT.getValue());
        validEntityBeforeSave(add);
        bo.setProtocolType(add.getProtocolType());
        bo.setDeviceType(add.getDeviceType());
        boolean flag = baseMapper.insert(add) > 0;
        if (flag) {
            bo.setId(add.getId());
            iDeviceLocationService.insertByBo(bo.getDeviceLocation());
            //基础TOPIC集合
            Map<String, String> topicMap = new HashMap<>();
            if (DeviceType.GATEWAY.getValue().equals(bo.getDeviceType())) {
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/topo/add", "边设备添加子设备");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/topo/addResponse", "物联网平台返回的添加子设备的响应");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/topo/delete", "边设备删除子设备");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/topo/deleteResponse", "物联网平台返回的删除子设备的响应");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/topo/update", "边设备更新子设备状态");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/topo/updateResponse", "物联网平台返回的更新子设备状态的响应");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/datas", "边设备上报数据");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/command", "物联网平台给设备或边设备下发命令");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/commandResponse", "边设备返回给物联网平台的命令响应");
            } else if (DeviceType.COMMON.getValue().equals(bo.getDeviceType())) {
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/datas", "边设备上报数据");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/command", "物联网平台给设备或边设备下发命令");
                topicMap.put("/v1/devices/" + bo.getDeviceIdentification() + "/commandResponse", "边设备返回给物联网平台的命令响应");
                Boolean commonDeviceTDSubtable = this.createCommonDeviceTDSubtable(bo);
                if (!commonDeviceTDSubtable) {
                    log.error("创建普通设备TD子表失败");
                }
            }
            //设备基础Topic数据存储
            List<DeviceTopic> topics = new ArrayList<>();
            for (Map.Entry<String, String> entry : topicMap.entrySet()) {
                DeviceTopic deviceTopic = new DeviceTopic();
                deviceTopic.setDeviceIdentification(bo.getDeviceIdentification());
                deviceTopic.setType(DeviceTopicEnum.BASIS.getKey());
                deviceTopic.setTopic(entry.getKey());
                if (entry.getKey().startsWith("/v1/devices/") && entry.getKey().endsWith("datas")) {
                    deviceTopic.setPublisher("边设备");
                    deviceTopic.setSubscriber("物联网平台");
                } else if (entry.getKey().startsWith("/v1/devices/") && entry.getKey().endsWith("commandResponse")) {
                    deviceTopic.setPublisher("边设备");
                    deviceTopic.setSubscriber("物联网平台");
                } else if (entry.getKey().startsWith("/v1/devices/") && (entry.getKey().endsWith("Response") || entry.getKey().endsWith("command"))) {
                    deviceTopic.setPublisher("物联网平台");
                    deviceTopic.setSubscriber("边设备");
                } else {
                    deviceTopic.setPublisher("边设备");
                    deviceTopic.setSubscriber("物联网平台");
                }
                deviceTopic.setRemark(entry.getValue());
                LoginUser loginUser = LoginHelper.getLoginUser();
                deviceTopic.setCreateBy(loginUser.getUsername());
                topics.add(deviceTopic);
            }
            if(CollectionUtil.isNotEmpty(topics)){
                iDeviceTopicService.insertBatch(topics);
            }
        }
        return flag;
    }

    /**
     * 修改边设备档案信息
     */
    @Override
    public Boolean updateByBo(DeviceBo bo) {
        Device update = BeanUtil.toBean(bo, Device.class);
        validEntityBeforeSave(update);
        final int insertDeviceCount = baseMapper.updateById(update);
        if (insertDeviceCount > 0) {
            iDeviceLocationService.updateByBo(bo.getDeviceLocation());
        }
        return insertDeviceCount > 0;
    }

    /**
     * 保存前的数据校验
     */
    private void validEntityBeforeSave(Device entity){
        //TODO 做一些数据校验,如唯一约束
        ProductVo productVo = iProductService.findOneByProductIdentification(entity.getProductIdentification());
        if(ObjectUtil.isNull(productVo)){
            throw new ServiceException("产品标识不正确，请核查！");
        }
        entity.setProtocolType(productVo.getProtocolType());
        entity.setDeviceType(productVo.getProductType());
    }

    /**
     * 批量删除边设备档案信息
     */
    @Override
    public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
        if(isValid){
            //TODO 做一些业务上的校验,判断是否需要校验
        }
        //TODO 删除相关表数据
        return baseMapper.deleteBatchIds(ids) > 0;
    }

    @Override
    public DeviceVo findOneByClientId(String clientId) {
        Device device = baseMapper.findOneByClientId(clientId);
        if (ObjectUtil.isNull(device)) {
            return null;
        }
        return BeanCopyUtils.copy(device, DeviceVo.class);
    }

    @Override
    public DeviceVo findOneByDeviceIdentification(String deviceIdentification) {
        Device device = baseMapper.findOneByDeviceIdentification(deviceIdentification);
        if (ObjectUtil.isNull(device)) {
            return null;
        }
        return BeanCopyUtils.copy(device, DeviceVo.class);
    }

    @Override
    public Map<String, List<Map<String, Object>>> getDeviceShadow(String ids, String startTime, String endTime) {
        List<Long> idCollection = Arrays.stream(ids.split(",")).mapToLong(Long::parseLong).boxed().collect(Collectors.toList());
        List<DeviceVo> devices = baseMapper.selectVoBatchIds(idCollection).stream().filter(new Predicate<DeviceVo>() {
            @Override
            public boolean test(DeviceVo deviceVo) {
                return StringUtils.equalsIgnoreCase(deviceVo.getDeviceStatus(), "ENABLE");
            }
        }).collect(Collectors.toList());
        if (CollectionUtil.isEmpty(devices)) {
            log.error("查询普通设备影子数据失败，普通设备不存在");
            return null;
        }
        Map<String, List<Map<String, Object>>> map = new HashMap<>();
        devices.forEach(device -> {
            ProductBo bo = new ProductBo();
            bo.setProductIdentification(device.getProductIdentification());
            List<ProductVo> products = iProductService.queryList(bo);
            if (CollectionUtil.isEmpty(products)) {
                log.error("查询普通设备影子数据失败，设备对应的产品不存在");
                return;
            }
            ProductServicesBo servicesBo = new ProductServicesBo();
            servicesBo.setProductIdentification(device.getProductIdentification());
            servicesBo.setStatus("0");
            List<ProductServicesVo> productServicesLis = iProductServicesService.queryList(servicesBo);
            if (CollectionUtil.isEmpty(productServicesLis)) {
                log.error("查询普通设备影子数据失败，普通设备services不存在");
                return;
            }
            productServicesLis.forEach(productServices -> {
                String superTableName = products.get(0).getProductType() + "_" + productServices.getProductIdentification() + "_" + productServices.getServiceName();
                String shadowTableName = superTableName + "_" + device.getClientId();
                TdSelect selectDto = new TdSelect();
                selectDto.setDataBaseName(dataBaseName);
                selectDto.setTableName(shadowTableName);
                if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
                    selectDto.setFieldName("ts");
                    selectDto.setStartTime(DateUtil.parseLocalDateTime(startTime).toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
                    selectDto.setEndTime(DateUtil.parseLocalDateTime(endTime).toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
                    List<Map<String, Object>> dataByTimestamp = tdEngineService.selectByTimesTamp(selectDto);
                    if (CollectionUtil.isEmpty(dataByTimestamp)) {
                        log.error("查询普通设备影子数据失败，普通设备影子数据不存在");
                    } else {
                        map.put(shadowTableName, dataByTimestamp);
                        log.info("查询普通设备影子数据成功，普通设备影子数据：{}", dataByTimestamp);

                    }
                } else {
                    List<Map<String, Object>> lastData = tdEngineService.getLastData(selectDto);
                    if (CollectionUtil.isEmpty(lastData)) {
                        log.error("查询普通设备影子数据失败，普通设备影子数据不存在");
                    } else {
                        map.put(shadowTableName, lastData);
                        log.info("查询普通设备影子数据成功，普通设备影子数据：{}", lastData);
                    }
                }
            });
        });
        return map;
    }

    /**
     * 创建普通设备TD子表
     *
     * @param device
     * @return
     */
    public Boolean createCommonDeviceTDSubtable(DeviceBo device) {
        ProductBo bo = new ProductBo();
        bo.setProductIdentification(device.getProductIdentification());
        bo.setProtocolType(device.getProtocolType());
        List<ProductVo> products = iProductService.queryList(bo);
        if (CollectionUtil.isEmpty(products)) {
            log.error("刷新子设备数据模型失败，子设备产品不存在");
            return false;
        }
        ProductServicesBo servicesBo = new ProductServicesBo();
        servicesBo.setProductIdentification(device.getProductIdentification());
        servicesBo.setStatus("0");
        List<ProductServicesVo> allByProductIdAndStatus = iProductServicesService.queryList(servicesBo);
        TdTable tableDto;
        for (ProductServicesVo productServices : allByProductIdAndStatus) {
            tableDto = new TdTable();
            tableDto.setDataBaseName(dataBaseName);
            //超级表命名规则 : 产品类型_产品标识_服务名称
            String superTableName = products.get(0).getProductType() + "_" + productServices.getProductIdentification() + "_" + productServices.getServiceName();

            tableDto.setSuperTableName(superTableName);
            //子表命名规则 : 产品类型_产品标识_服务名称_设备标识（设备唯一标识）

            tableDto.setTableName(superTableName + "_" + device.getDeviceIdentification());
            //Tag的处理
            List<TdFields> tagsFieldValues = new ArrayList<>();
            TdFields fields = new TdFields();
            fields.setFieldValue(device.getDeviceIdentification());
            tagsFieldValues.add(fields);
            tableDto.setTagsFieldValues(tagsFieldValues);
            try {
                tdEngineService.createTable(tableDto);
                log.info("Create SuperTable Success: " + tableDto.getTableName());
            } catch (Exception e) {
                log.error("Create SuperTable Exception: " + e.getMessage());
            }
        }
        return true;
    }
    @Override
    public int updateConnectStatusByClientId(String updatedConnectStatus, String clientId) {
        return baseMapper.updateConnectStatusByClientId(updatedConnectStatus, clientId);
    }

    @Override
    public Boolean clientAuthentication(String clientIdentifier, String username, String password, String deviceStatus, String protocolType) {
        final Device device = baseMapper.findOneByClientIdAndUserNameAndPasswordAndDeviceStatusAndProtocolType(clientIdentifier, username, password, deviceStatus, protocolType);
        if (Optional.ofNullable(device).isPresent()) {
            //缓存设备信息
            RedisUtils.setCacheObject(Constants.DEVICE_RECORD_KEY + device.getDeviceIdentification(), BeanCopyUtils.copy(device, DeviceVo.class), Duration.ofSeconds(60L + Long.parseLong(RandomUtil.randomNumbers(1))));
            //更改设备在线状态为在线
            baseMapper.updateConnectStatusByClientId(DeviceConnectStatus.ONLINE.getValue(), clientIdentifier);
            return true;
        }
        return false;
    }

    @Override
    public Boolean disconnect(Long[] ids) {
        final List<DeviceVo> deviceList = baseMapper.selectVoBatchIds(Arrays.asList(ids));
        if (CollectionUtil.isEmpty(deviceList)) {
            return false;
        }
        final List<String> clientIdentifiers = deviceList.stream().map(DeviceVo::getClientId).collect(Collectors.toList());
        final String result = mqttService.closeConnection(clientIdentifiers);
        log.info("主动断开设备ID: {} 连接 , Broker 处理结果: {}", clientIdentifiers, result);
        return true;
    }
}
